package com.example.sparkdemo.jksj

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
 *
 *
 * @author zhang.t.c
 * @date 2021/8/23
 */
object ReverseIndex {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark1")
    val sc = new SparkContext(sparkConf)

    // (file:/D:/demo/spark-demo/datas/reverseindex/0,it is what it is)
    val files: RDD[(String, String)] = sc.wholeTextFiles("D:\\demo\\spark-demo\\datas\\reverseindex")
    val wordFileCount: RDD[((String, String), Int)] = files
      .flatMap(file => {
        val path: String = file._1
        val content: String = file._2
        val splits: Array[String] = path.split("/")
        val filename = splits.last
        val words: Array[String] = content.split(" ")
        val wordFileCountTuple: Array[((String, String), Int)] = words.map(w => {
          ((w, filename), 1)
        })
        wordFileCountTuple
      })
      .reduceByKey(_ + _)

    val wordFileCountRes: Array[(String, (String, Int))] = wordFileCount
      .map(t => {
        val wordFile: (String, String) = t._1
        val count: Int = t._2
        (wordFile._1, (wordFile._2, count))
      }).collect()

    val map = new mutable.HashMap[String, ListBuffer[(String, Int)]]()
    wordFileCountRes.foreach(t => {
      val word: String = t._1
      val fileCount: (String, Int) = t._2
      val listOpt: Option[ListBuffer[(String, Int)]] = map.get(word)
      if (listOpt.isEmpty) {
        val lb = new ListBuffer[(String, Int)]
        lb.append(fileCount)
        map.put(word, lb)
      } else {
        val list: ListBuffer[(String, Int)] = listOpt.get
        list.append(fileCount)
      }
    })
    map.foreach(println)

    sc.stop()
  }

}
